package com.lagou.es_springcloud.consumer;

import com.lagou.es_springcloud.pojo.Products;
import com.lagou.es_springcloud.repository.ProductsRepository;
import com.lagou.es_springcloud.service.ProductsService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class ProductsConsumer {

    @Autowired
    private ProductsService productsService;

    @Autowired
    private ProductsRepository productsRepository;

    @KafkaListener(topics = "lagou")
    public void listen (ConsumerRecord<String, String> record){
        System.out.println(String.format("topic:%s, offset:%d, msg:%s", record.topic(), record.offset(), record.value()));
        Products products = productsService.findById(Integer.parseInt(record.value()));
        productsRepository.save(products);
        System.out.println("save success !");
    }
}
